package com.ysb.config.rabbit.consumer;

import com.ysb.config.rabbit.AbstractRabbitConfig;
import com.ysb.constant.RabbitConstants;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.config.RetryInterceptorBuilder;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

/**
 * @author scaffolding
 * @since ${dateTime}
 */
@Configuration
public class RabbitConsumerConfig extends AbstractRabbitConfig {

    /**
     * Message Listener Container Configuration
     * <a href="https://docs.spring.io/spring-amqp/docs/2.3.16/reference/html/#containerAttributes">...</a>
     */
    @Bean
    public SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory(
            ConnectionFactory drugQualityConnectionFactory,
            MessageConverter jsonMessageConverter,
            ThreadPoolTaskExecutor messageConsumerExecutor,
            RetryTemplate retryTemplate) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();

        factory.setConnectionFactory(drugQualityConnectionFactory);
        // 手动确认
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        // 消息JSON转换
        factory.setMessageConverter(jsonMessageConverter);
        // 线程池
        factory.setTaskExecutor(messageConsumerExecutor);
        // 线程数
        factory.setConcurrentConsumers(RabbitConstants.MIN_CONSUMERS);
        // 最大线程数
        factory.setMaxConcurrentConsumers(RabbitConstants.MAX_CONSUMERS);
        // 增加consumer所需要的连续收到消息数，默认为10。考虑到消息特别多，为了保证消费效率，设为1
        factory.setConsecutiveActiveTrigger(1);
        // 减少consumer所需要的连续空闲数，默认为10。
        factory.setConsecutiveIdleTrigger(10);
        // 启动新的消费者最小时间间隔，默认10s
        factory.setStartConsumerMinInterval(1L);
        // 停止空闲消费者最小时间间隔，默认60s
        factory.setStopConsumerMinInterval(60L);
        // https://blog.rabbitmq.com/posts/2014/04/finding-bottlenecks-with-rabbitmq-3-3#consumer-utilisation
        factory.setPrefetchCount(30);
        /*
            拦截链，用来处理客户端与Rabbit服务器连接断开所导致的ACK/NACK失败的极端情况
            当消息确认模式为手动确认时，ACK/NACK失败会导致消息无限阻塞，直至连接恢复
         */
        factory.setAdviceChain(
                RetryInterceptorBuilder
                        // 无状态
                        .stateless()
                        // 消息回收：拒绝且不入队
                        .recoverer(new RejectAndDontRequeueRecoverer())
                        // 重试模板类
                        .retryOperations(retryTemplate)
                        .build());
        return factory;
    }

    /**
     * 重试模板类
     *
     * @param exponentialBackOffPolicy 重试的回滚策略
     * @param simpleRetryPolicy        重试策略
     * @return 重试模板类
     */
    @Bean
    public RetryTemplate retryTemplate(ExponentialBackOffPolicy exponentialBackOffPolicy,
                                       SimpleRetryPolicy simpleRetryPolicy) {
        RetryTemplate retryTemplate = new RetryTemplate();
        retryTemplate.setBackOffPolicy(exponentialBackOffPolicy);
        retryTemplate.setRetryPolicy(simpleRetryPolicy);
        return retryTemplate;
    }

    /**
     * 重试的回滚策略
     */
    @Bean
    public ExponentialBackOffPolicy exponentialBackOffPolicy() {
        ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
        // 重试初始间隔
        backOffPolicy.setInitialInterval(3000);
        // 重试最大间隔
        backOffPolicy.setMaxInterval(12000);
        // 重试间隔乘法策略
        backOffPolicy.setMultiplier(2);
        return backOffPolicy;
    }

    /**
     * 重试策略
     */
    @Bean
    public SimpleRetryPolicy simpleRetryPolicy() {
        SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
        // 最大重试次数
        retryPolicy.setMaxAttempts(3);
        return retryPolicy;
    }
}
